package cn.org.wangchangjiu.redis.delay;

import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
 * @Classname RedisMessageConsumerContainer
 * @Description
 * @Date 2023/7/3 21:14
 * @Created by wangchangjiu
 */
public class RedisDelayMessageConsumerContainer {

    /**
     * 消费逻辑方法所属bean
     */
    private Object bean;

    /**
     * 业务消费方法
     */
    private Method method;

    private Boolean autoAck = true;

    /**
     * 注解详情
     */
    private RedisDelayMessageListener redisDelayMessageListener;

    public RedisDelayMessageConsumerContainer(Object bean, Method method, RedisDelayMessageListener redisDelayMessageListener) {
        this.bean = bean;
        this.method = method;
        this.redisDelayMessageListener = redisDelayMessageListener;

        if(!StringUtils.hasText(this.redisDelayMessageListener.topic())){
            throw new RuntimeException("redisMessageListener groupId or queue cannot empty");
        }

        Class<?>[] parameterTypes = method.getParameterTypes();
        if(parameterTypes.length == 0 || parameterTypes.length > 2 ){
            throw new RuntimeException("method parameters can only be one or two");
        }

        if(parameterTypes.length > 1){
            if(!Acknowledgment.class.isAssignableFrom(parameterTypes[1])){
                throw new RuntimeException("The second parameter type must be of type Acknowledgement or its subclass");
            }
            this.autoAck = false;
        }


    }


    @SneakyThrows
    public  void invoke(RedisDelayMessage message, Class aClass, DelayQueue delayQueue) {
        Class<?>[] parameterTypes = method.getParameterTypes();
        Object messageBody = JSON.parseObject(message.getMessageBody(), aClass);
        if(parameterTypes.length > 1){
            method.invoke(bean, messageBody,
                    new ConsumerAcknowledgment(delayQueue, message.getMessageId(), message.getRegisterService(), message.getTopic()));
        } else {
            method.invoke(bean, messageBody);
        }

    }

    public Boolean getAutoAck() {
        return autoAck;
    }

    private final class ConsumerAcknowledgment implements Acknowledgment {

        private final DelayQueue delayQueue;

        private final String messageId;
        private final String registerService;
        private final String topic;

        ConsumerAcknowledgment(DelayQueue delayQueue, String messageId, String registerService, String topic) {
            this.delayQueue = delayQueue;
            this.messageId = messageId;
            this.registerService = registerService;
            this.topic = topic;
        }

        @Override
        public void acknowledge() {
            delayQueue.ackMessage(this.registerService, this.topic, this.messageId);
        }

        @Override
        public String toString() {
            return "Acknowledgment for registerService:" + this.registerService + " topic:" + this.topic + " messageId:" + this.messageId;
        }

    }
}
